// This file is part of Substrate.

// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! API implementation for submitting transactions.

use crate::{
	transaction::{
		api::TransactionApiServer,
		error::Error,
		event::{TransactionBlock, TransactionDropped, TransactionError, TransactionEvent},
	},
	SubscriptionTaskExecutor,
};

use codec::Decode;
use futures::{StreamExt, TryFutureExt};
use jsonrpsee::{core::async_trait, PendingSubscriptionSink};

use super::metrics::{InstanceMetrics, Metrics};

use sc_rpc::utils::{RingBuffer, Subscription};
use sc_transaction_pool_api::{
	error::IntoPoolError, BlockHash, TransactionFor, TransactionPool, TransactionSource,
	TransactionStatus,
};
use sp_blockchain::HeaderBackend;
use sp_core::Bytes;
use sp_runtime::traits::Block as BlockT;
use std::sync::Arc;

pub(crate) const LOG_TARGET: &str = "rpc-spec-v2";

/// An API for transaction RPC calls.
pub struct Transaction<Pool, Client> {
	/// Substrate client.
	client: Arc<Client>,
	/// Transactions pool.
	pool: Arc<Pool>,
	/// Executor to spawn subscriptions.
	executor: SubscriptionTaskExecutor,
	/// Metrics for transactions.
	metrics: Option<Metrics>,
}

impl<Pool, Client> Transaction<Pool, Client> {
	/// Creates a new [`Transaction`].
	pub fn new(
		client: Arc<Client>,
		pool: Arc<Pool>,
		executor: SubscriptionTaskExecutor,
		metrics: Option<Metrics>,
	) -> Self {
		Transaction { client, pool, executor, metrics }
	}
}

/// Currently we treat all RPC transactions as externals.
///
/// Possibly in the future we could allow opt-in for special treatment
/// of such transactions, so that the block authors can inject
/// some unique transactions via RPC and have them included in the pool.
const TX_SOURCE: TransactionSource = TransactionSource::External;

#[async_trait]
impl<Pool, Client> TransactionApiServer<BlockHash<Pool>> for Transaction<Pool, Client>
where
	Pool: TransactionPool + Sync + Send + 'static,
	Pool::Hash: Unpin,
	<Pool::Block as BlockT>::Hash: Unpin,
	Client: HeaderBackend<Pool::Block> + Send + Sync + 'static,
{
	fn submit_and_watch(&self, pending: PendingSubscriptionSink, xt: Bytes) {
		let client = self.client.clone();
		let pool = self.pool.clone();

		// Get a new transaction metrics instance and increment the counter.
		let mut metrics = InstanceMetrics::new(self.metrics.clone());

		let fut = async move {
			let decoded_extrinsic = match TransactionFor::<Pool>::decode(&mut &xt[..]) {
				Ok(decoded_extrinsic) => decoded_extrinsic,
				Err(e) => {
					log::debug!(target: LOG_TARGET, "Extrinsic bytes cannot be decoded: {:?}", e);

					let Ok(sink) = pending.accept().await.map(Subscription::from) else { return };

					let event = TransactionEvent::Invalid::<BlockHash<Pool>>(TransactionError {
						error: "Extrinsic bytes cannot be decoded".into(),
					});

					metrics.register_event(&event);

					// The transaction is invalid.
					let _ = sink.send(&event).await;
					return
				},
			};

			let best_block_hash = client.info().best_hash;

			let submit = pool
				.submit_and_watch(best_block_hash, TX_SOURCE, decoded_extrinsic)
				.map_err(|e| {
					e.into_pool_error()
						.map(Error::from)
						.unwrap_or_else(|e| Error::Verification(Box::new(e)))
				});

			let Ok(sink) = pending.accept().await.map(Subscription::from) else {
				return;
			};

			match submit.await {
				Ok(stream) => {
					let stream = stream
						.filter_map(|event| {
							let event = handle_event(event);

							event.as_ref().inspect(|event| {
								metrics.register_event(event);
							});

							async move { event }
						})
						.boxed();

					// If the subscription is too slow older events will be overwritten.
					sink.pipe_from_stream(stream, RingBuffer::new(3)).await;
				},
				Err(err) => {
					// We have not created an `Watcher` for the tx. Make sure the
					// error is still propagated as an event.
					let event: TransactionEvent<<Pool::Block as BlockT>::Hash> = err.into();

					metrics.register_event(&event);

					_ = sink.send(&event).await;
				},
			};
		};

		sc_rpc::utils::spawn_subscription_task(&self.executor, fut);
	}
}

/// Handle events generated by the transaction-pool and convert them
/// to the new API expected state.
#[inline]
fn handle_event<Hash: Clone, BlockHash: Clone>(
	event: TransactionStatus<Hash, BlockHash>,
) -> Option<TransactionEvent<BlockHash>> {
	match event {
		TransactionStatus::Ready | TransactionStatus::Future =>
			Some(TransactionEvent::<BlockHash>::Validated),
		TransactionStatus::InBlock((hash, index)) =>
			Some(TransactionEvent::BestChainBlockIncluded(Some(TransactionBlock { hash, index }))),
		TransactionStatus::Retracted(_) => Some(TransactionEvent::BestChainBlockIncluded(None)),
		TransactionStatus::FinalityTimeout(_) =>
			Some(TransactionEvent::Dropped(TransactionDropped {
				error: "Maximum number of finality watchers has been reached".into(),
			})),
		TransactionStatus::Finalized((hash, index)) =>
			Some(TransactionEvent::Finalized(TransactionBlock { hash, index })),
		TransactionStatus::Usurped(_) => Some(TransactionEvent::Invalid(TransactionError {
			error: "Extrinsic was rendered invalid by another extrinsic".into(),
		})),
		TransactionStatus::Dropped => Some(TransactionEvent::Dropped(TransactionDropped {
			error: "Extrinsic dropped from the pool due to exceeding limits".into(),
		})),
		TransactionStatus::Invalid => Some(TransactionEvent::Invalid(TransactionError {
			error: "Extrinsic marked as invalid".into(),
		})),
		// These are the events that are not supported by the new API.
		TransactionStatus::Broadcast(_) => None,
	}
}
